import java.util.Arrays;
import org.apache.flink.types.Row;
import org.apache.flink.table.api.Table;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;


public class FlatMap
{


   public static void main(String[] args) throws Exception
    {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        TableFunction<Row> func = new MyFlatMapFunction();
        tEnv.registerFunction("func",func);


        DataSet<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));


//        (ds1, "user,product,amount");

        Table input = tEnv.fromDataSet(ds1,$("user"),$("product"),$("amount"));

        input.flatMap(call("func", "product")).as("a", "b");
        tEnv.toDataSet(input, Row.class).print();



    }

}


//很重要的参考:
//https://blog.csdn.net/weixin_38255219/article/details/107066542
